Conversation
A workflow mixin (PubSubMixin) that turns any workflow into a pub/sub broker. Activities and starters publish via batched signals; external clients subscribe via long-poll updates exposed as an async iterator. Key design decisions: - Payloads are opaque bytes for cross-language compatibility - Topics are plain strings, no hierarchy or prefix matching - Global monotonic offsets (not per-topic) for simple continuation - Batching built into PubSubClient with Nagle-like timer + priority flush - Structured concurrency: no fire-and-forget tasks, trio-compatible - Continue-as-new support: drain_pubsub() + get_pubsub_state() + validator to cleanly drain polls, plus follow_continues on the subscriber side Module layout: _types.py — PubSubItem, PublishInput, PollInput, PollResult, PubSubState _mixin.py — PubSubMixin (signal, update, query handlers) _client.py — PubSubClient (batcher, async iterator, CAN resilience) 9 E2E integration tests covering: activity publish + subscribe, topic filtering, offset-based replay, interleaved workflow/activity publish, priority flush, iterator cancellation, context manager flush, concurrent subscribers, and mixin coexistence with application signals/queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
PubSubState is now a Pydantic model so it survives serialization through Pydantic-based data converters when embedded in Any-typed fields. Without this, continue-as-new would fail with "'dict' object has no attribute 'log'" because Pydantic deserializes Any fields as plain dicts. Added two CAN tests: - test_continue_as_new_any_typed_fails: documents that Any-typed fields lose PubSubState type information (negative test) - test_continue_as_new_properly_typed: verifies CAN works with properly typed PubSubState | None fields Simplified subscribe() exception handling: removed the broad except Exception clause that tried _follow_continue_as_new() on every error. Now only catches WorkflowUpdateRPCTimeoutOrCancelledError for CAN follow. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
README.md: usage-oriented documentation covering workflow mixin, activity publishing, subscribing, continue-as-new, and cross-language protocol. flush() safety: items are now removed from the buffer only after the signal succeeds. Previously, buffer.clear() ran before the signal, losing items on failure. Added test_flush_retains_items_on_signal_failure. init_pubsub() guard: publish() and _pubsub_publish signal handler now check for initialization and raise a clear RuntimeError instead of a cryptic AttributeError. PubSubClient.for_workflow() factory: preferred constructor that takes a Client + workflow_id. Enables follow_continues in subscribe() without accessing private WorkflowHandle._client. The handle-based constructor remains for simple cases that don't need CAN following. activity_pubsub_client() now uses for_workflow() internally with proper keyword-only typed arguments instead of **kwargs: object. CAN test timing: replaced asyncio.sleep(2) with assert_eq_eventually polling for a different run_id, matching sdk-python test patterns. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
_pubsub_poll and _pubsub_offset now call _check_initialized() for a clear RuntimeError instead of cryptic AttributeError when init_pubsub() is forgotten. README CAN example now includes the required imports (@DataClass, workflow) and @workflow.init decorator. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The poll validator accesses _pubsub_draining, which would AttributeError if init_pubsub() was never called. Added _check_initialized() guard. Fixed PubSubState docstring: the field must be typed as PubSubState | None, not Any. The old docstring incorrectly implied Any-typed fields would work. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
get_pubsub_state() and drain_pubsub() now call _check_initialized(). Previously drain_pubsub() could silently set _pubsub_draining on an uninitialized instance, which init_pubsub() would then reset to False. New tests: - test_max_batch_size: verifies auto-flush when buffer reaches limit, using max_cached_workflows=0 to also test replay safety - test_replay_safety: interleaved workflow/activity publish with max_cached_workflows=0, proving the mixin is determinism-safe Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review comments (#@agent: annotations) capture design questions on: - Topic offset model and information leakage (resolved: global offsets with BFF-layer containment, per NATS JetStream model) - Exactly-once publish delivery (resolved: publisher ID + sequence number dedup, per Kafka producer model) - Flush concurrency (resolved: asyncio.Lock with buffer swap) - CAN follow behavior, poll rate limiting, activity context detection, validator purpose, pyright errors, API ergonomics DESIGN-ADDENDUM-TOPICS.md: full exploration of per-topic vs global offsets with industry survey (Kafka, Redis, NATS, PubNub, Google Pub/Sub, RabbitMQ). Concludes global offsets are correct for workflow-scoped pub/sub; leakage contained at BFF trust boundary. DESIGN-ADDENDUM-DEDUP.md: exactly-once delivery via publisher ID + monotonic sequence number. Workflow dedup state is dict[str, int], bounded by publisher count. Buffer swap pattern with sequence reuse on failure. PubSubState carries publisher_sequences through CAN. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Types: - Remove offset from PubSubItem (global offset is now derived) - Add publisher_id + sequence to PublishInput for exactly-once dedup - Add base_offset + publisher_sequences to PubSubState for CAN - Use Field(default_factory=...) for Pydantic mutable defaults Mixin: - Add _pubsub_base_offset for future log truncation support - Add _pubsub_publisher_sequences for signal deduplication - Dedup in signal handler: reject if sequence <= last seen - Poll uses base_offset arithmetic for offset translation - Class-body type declarations for basedpyright compatibility - Validator docstring explaining drain/CAN interaction - Module docstring gives specific init_pubsub() guidance Client: - asyncio.Lock + buffer swap for flush concurrency safety - Publisher ID (uuid) + monotonic sequence for exactly-once delivery - Sequence advances on failure to prevent data loss when new items merge with retry batch (found via Codex review) - Remove follow_continues param — always follow CAN via describe() - Configurable poll_interval (default 0.1s) for rate limiting - Merge activity_pubsub_client() into for_workflow() with auto-detect - _follow_continue_as_new is async with describe() check Tests: - New test_dedup_rejects_duplicate_signal - Updated flush failure test for new sequence semantics - All activities use PubSubClient.for_workflow() - Remove PubSubItem.offset assertions - poll_interval=0 in test helper for speed Docs: - DESIGN-v2.md: consolidated design doc superseding original + addenda - README.md: updated API reference - DESIGN-ADDENDUM-DEDUP.md: corrected flush failure semantics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrite the client-side dedup algorithm to match the formally verified TLA+ protocol: failed flushes keep a separate _pending batch and retry with the same sequence number. Only advance the confirmed sequence on success. TLC proves NoDuplicates and OrderPreserved for the correct algorithm, and finds duplicates in the old algorithm. Add TTL-based pruning of publisher dedup entries during continue-as-new (default 15 min). Add max_retry_duration (default 600s) to bound client retries — must be less than publisher_ttl for safety. Both constraints are formally verified in PubSubDedupTTL.tla. Add truncate_pubsub() for explicit log prefix truncation. Add publisher_last_seen timestamps for TTL tracking. Preserve legacy state without timestamps during upgrade. API changes: for_workflow→create, flush removed (use priority=True), poll_interval→poll_cooldown, publisher ID shortened to 16 hex chars. Includes TLA+ specs (correct, broken, inductive, multi-publisher TTL), PROOF.md with per-action preservation arguments, scope and limitations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New analysis document evaluates whether publishing should use signals or updates, examining Temporal's native dedup (Update ID per-run, request_id for RPCs) vs the application-level (publisher_id, sequence) protocol. Conclusion: app-level dedup is permanent for signals but could be dropped for updates once temporal/temporal#6375 is fixed. Non-blocking flush keeps signals as the right choice for streaming. Updates DESIGN-v2.md section 6 to be precise about the two Temporal guarantees that signal ordering relies on: sequential send order and history-order handler invocation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Analyzes deduplication through the end-to-end principle lens. Three types of duplicates exist in the pipeline, each handled at the layer that introduces them: - Type A (duplicate LLM work): belongs at application layer — data escapes to consumers before the duplicate exists, so only the application can resolve it - Type B (duplicate signal batches): belongs in pub/sub workflow — encapsulates transport details and is the only layer that can detect them correctly - Type C (duplicate SSE delivery): belongs at BFF/browser layer Concludes the (publisher_id, sequence) protocol is correctly placed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… design Fill gaps identified during design review: - Document why per-topic offsets were rejected (trust model, cursor portability, unjustified complexity) inline rather than only in historical addendum - Expand BFF section with the four reconnection options considered and the decision to use SSE Last-Event-ID with BFF-assigned gapless IDs - Add poll efficiency characteristics (O(new items) common case) - Document BFF restart fallback (replay from turn start) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire types (PublishEntry, _WireItem, PollResult, PubSubState) encode data as base64 strings for cross-language compatibility across all Temporal SDKs. User-facing types (PubSubItem) use native bytes. Conversion happens inside handlers: - Signal handler decodes base64 → bytes on ingest - Poll handler encodes bytes → base64 on response - Client publish() accepts bytes, encodes for signal - Client subscribe() decodes poll response, yields bytes This means Go/Java/.NET ports get cross-language compat for free since their JSON serializers encode byte[] as base64 by default. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| class PubSubState(BaseModel): | ||
| """Serializable snapshot of pub/sub state for continue-as-new. | ||
|
|
||
| This is a Pydantic model (not a dataclass) so that Pydantic-based data |
There was a problem hiding this comment.
In this case it needs to somehow be clear that the pydantic data converter is required for this.
There was a problem hiding this comment.
Thanks for pointing this out. It turns out we can remove the Pydantic dependency.
Remove the bounded poll wait from PubSubMixin and trim trailing whitespace from types. Update DESIGN-v2.md with streaming plugin rationale (no fencing needed, UI handles repeat delivery). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add opt-in streaming code path to both agent framework plugins. When enabled, the model activity calls the streaming LLM endpoint, publishes TEXT_DELTA/THINKING_DELTA/TOOL_CALL_START events via PubSubClient as a side channel, and returns the complete response for the workflow to process (unchanged interface). OpenAI Agents SDK: - ModelActivityParameters.enable_streaming flag - New invoke_model_activity_streaming method on ModelActivity - ModelResponse reconstructed from ResponseCompletedEvent - Uses @_auto_heartbeater for periodic heartbeats - Routing in _temporal_model_stub (rejects local activities) Google ADK: - TemporalModel(streaming=True) constructor parameter - New invoke_model_streaming activity using stream=True - Registered in GoogleAdkPlugin Both use batch_interval=0.1s for near-real-time token delivery. No pubsub module changes needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Pydantic BaseModel was introduced as a workaround for Any-typed fields losing type information during continue-as-new serialization. The actual fix is using concrete type annotations (PubSubState | None), which the default data converter handles correctly for dataclasses — no Pydantic dependency needed. This removes the pydantic import from the pubsub contrib module entirely, making it work out of the box with the default data converter. All 18 tests pass, including both continue-as-new tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implements DESIGN-ADDENDUM-ITEM-OFFSET.md. The poll handler now annotates each item with its global offset (base_offset + position in log), enabling subscribers to track fine-grained consumption progress for truncation. This is needed for the voice-terminal agent where audio chunks must not be truncated until actually played, not merely received. - Add offset field to PubSubItem and _WireItem (default 0) - Poll handler computes offset from base_offset + log_offset + enumerate index - subscribe() passes wire_item.offset through to yielded PubSubItem - Tests: per-item offsets, offsets with topic filtering, offsets after truncation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Documents the motivation and design for adding offset fields to PubSubItem and _WireItem, enabling subscribers to track consumption at item granularity rather than batch boundaries. Driven by the voice-terminal agent's need to truncate only after audio playback, not just after receipt. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Three changes: 1. Poll handler: replace ValueError with ApplicationError(non_retryable=True) when requested offset has been truncated. This fails the UPDATE (client gets the error) without crashing the WORKFLOW TASK — avoids the poison pill during replay that caused permanent workflow failures. 2. Poll handler: treat from_offset=0 as "from the beginning of whatever exists" (i.e., from base_offset). This lets subscribers recover from truncation by resubscribing from 0 without knowing the current base. 3. PubSubClient.subscribe(): catch WorkflowUpdateFailedError with type TruncatedOffset and retry from offset 0, auto-recovering. New tests: - test_poll_truncated_offset_returns_application_error - test_poll_offset_zero_after_truncation - test_subscribe_recovers_from_truncation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Verify that PubSubClient can subscribe to events from a different workflow (same namespace) and that Nexus operations can start pub/sub broker workflows in a separate namespace with cross-namespace subscription working end-to-end. No library changes needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Poll responses now estimate wire size (base64 data + topic) and stop adding items once the response exceeds 1MB. The new `more_ready` flag on PollResult tells the subscriber that more data is available, so it skips the poll_cooldown sleep and immediately re-polls. This avoids unnecessary latency during big reloads or catch-up scenarios while keeping individual update payloads within Temporal's recommended limits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Codify the four wire evolution rules that have been followed implicitly through four addenda: additive-only fields with defaults, immutable handler names, forward-compatible PubSubState, and no application-level version negotiation. Includes a precedent table showing all past changes and reasoning for why version fields in payloads would cause silent data loss on signals. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
After max_retry_duration expires, the client dropped the pending batch without advancing _sequence. The next batch reused the same sequence number, which could be silently deduplicated by the workflow if the timed-out signal was actually delivered — causing permanent data loss for those items. The fix advances _sequence to _pending_seq before clearing _pending, ensuring subsequent batches always get a fresh sequence number. TLA+ verification: - Added DropPendingBuggy/DropPendingFixed actions to PubSubDedup.tla - Added SequenceFreshness invariant: (pending=<<>>) => (confirmed_seq >= wf_last_seq) - BuggyDropSpec FAILS SequenceFreshness (confirmed_seq=0 < wf_last_seq=1) - FixedDropSpec PASSES all invariants (489 distinct states) - NoDuplicates passes for both — the bug causes data loss, not duplicates Python test: - test_retry_timeout_sequence_reuse_causes_data_loss demonstrates the end-to-end consequence: reused seq=1 is rejected, fresh seq=2 accepted Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
# Conflicts: # temporalio/contrib/google_adk_agents/_model.py
This is a new release with no legacy to support. Changes: - _mixin.py: Remove ts-is-None fallback that retained publishers without timestamps. All publishers always have timestamps, so this was dead code. - _types.py: Clean up docstrings referencing addendum docs - DESIGN-v2.md: Remove backward-compat framing, addendum references, and historical file listing. Keep the actual evolution rules. - PROOF.md: "Legacy publisher_id" → "Empty publisher_id" - README.md: Reference DESIGN-v2.md instead of deleted addendum - Delete DESIGN.md and 4 DESIGN-ADDENDUM-*.md files (preserved in the top-level streaming-comparisons repo) - Delete stale TLA+ trace .bin files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Simplify the README to focus on essential API patterns. Rename for_workflow() to create() throughout, condense the topics section, remove the exactly-once and type-warning sections (these details belong in DESIGN-v2.md), and update the API reference table with current parameter signatures. Also fix whitespace alignment in DESIGN-v2.md diagram. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…de pubsub state The CAN example only showed pubsub_state being passed through, which could mislead readers into thinking that's all that's needed. Updated to include a representative application field (items_processed) to make it clear that your own workflow state must also be carried across the CAN boundary. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Three related test-quality changes after a Codex challenge pass. Delete test_continue_as_new_any_typed_fails (and its workflow/input classes). It exercised the default Temporal data converter behavior (Any-typed dataclass field deserializes as dict) rather than a pubsub concern, and relied on a weak assert_task_fail_eventually that would pass for any task failure. Replace with a doc note on init_pubsub() warning about Any-typed pubsub_state fields, keeping the guidance where a user looks when wiring up CAN. Strengthen test_continue_as_new_properly_typed. Previously only verified log contents and offsets survived CAN. Now also verifies publisher dedup state survives: seeds publisher_id="pub" sequence=1, CANs, and asserts on publisher_sequences directly via a new query handler. Three assertions — after CAN, after a duplicate publish, and after a fresh-sequence publish — bracket the dedup contract without inferring it from log length. Inline the previously-shared _run_can_test helper since only one caller remained. Widen TTL test margins from (0.3s sleep, 0.1s TTL) to (1.0s sleep, 0.5s TTL). The tighter margin left ~100ms headroom on each side for pub-old to prune and pub-new to survive — borderline on slow CI where worker scheduling between publish and query can itself exceed 100ms. The new margins tolerate multi-hundred-ms scheduling jitter in both directions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four sets of function-local imports had no technical justification — no circular imports, no optional dependencies, no heavy-module deferral benefit for a test file. They were editorial drift from incremental additions. Move them to the top of the file: - WorkflowUpdateFailedError (was local in truncate-error test) - unittest.mock.patch (was duplicated in two retry tests) - temporalio.api.nexus.v1, temporalio.api.operatorservice.v1 (was local in create_cross_namespace_endpoint helper) - google.protobuf.duration_pb2, temporalio.api.workflowservice.v1 (was local in cross-namespace Nexus test) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
PubSubClient.__aexit__ could silently drop items on context-manager exit. A single _flush() processes either pending OR buffer (if/elif), so when the flusher task was cancelled mid-signal (pending set) while the producer had added more items (buffer non-empty), the final flush handled pending and left buffered items orphaned. Real impact: agent streaming that publishes a last token and immediately exits the context manager could silently drop trailing tokens depending on timing. Fix by draining both in a loop until pending and buffer are empty. This bug was latent in test_max_batch_size because that test's activity loop had no awaits — the flusher never ran during the loop, so pending never accumulated concurrently with buffer. Strengthening the test exposed it. Test changes: - test_max_batch_size: add an await asyncio.sleep(0) between publishes (matches real agent workloads that yield on every LLM token) and assert via publisher_sequences query that max_batch_size actually triggers >=2 mid-loop flushes, not a single exit flush. Without this the test passed even if max_batch_size were ignored entirely. - test_replay_safety: assert the full ordered 7-item sequence and offsets rather than just endpoints. Endpoint-only checks would miss mid-stream replay corruption (reordering, duplication, drops). - test_poll_truncated_offset_returns_application_error: add a comment explaining why pytest.raises(WorkflowUpdateFailedError) suffices to prove the handler raised ApplicationError — Temporal's update protocol completes with this error only for ApplicationError; other exceptions fail the workflow task instead, causing execute_update to hang rather than raise. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address a small set of stylistic issues flagged during review. Fix stale docstring in PubSubState's PollResult: the field is more_ready, not has_more. Readers following the docstring would have looked for a non-existent attribute. Add generic parameters to the WorkflowHandle annotation in PubSubClient.__init__ (WorkflowHandle[Any, Any]). Matches the treatment applied earlier in the tests; PubSubClient is polymorphic over workflow types. Rename the signal/update handler parameters in PubSubMixin from `input` (which shadowed the builtin) to `payload`. The type names (PublishInput, PollInput) already convey "input," so the parameter name was redundant. Drop the now-unnecessary `# noqa: A002` on the validator. Clarify the PubSubClient.__init__ docstring about continue-as-new: previously said "prefer create() when you need CAN following," now explicitly notes that the direct-handle form does not follow CAN and will stop yielding once the original run ends. Run `ruff check --select I --fix` and `ruff format` to bring the module and tests into line with project lint. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four changes responding to review comments on sdk-python PR #1423: C1 (init_pubsub pattern). Docstrings, README, and DESIGN-v2.md now advise a single call site from @workflow.init with prior_state threaded through the workflow input, instead of the previous "call in __init__ for fresh, in run() for CAN" split. The signature is unchanged (prior_state is still optional and defaults to None) — the change is to the blessed pattern. C2 (rename priority -> force_flush). PubSubClient.publish() renames the kwarg to force_flush. The kwarg never implied ordering — it just forces an immediate flush of the buffer — so the new name is accurate. Internal test helpers, comments, and docs updated. C3 (split create / from_activity). PubSubClient.create() now requires explicit (client, workflow_id); the silent auto-detect path is gone. A new PubSubClient.from_activity() classmethod pulls both from the current activity context. This removes the failure mode where omitting args outside an activity produced a confusing runtime error. Activity-side test helpers migrated to from_activity(). C5 (truncation rationale). DESIGN-v2.md section 10 no longer describes truncation as "deferred to a future iteration" — the feature is implemented, and voice streaming workflows have shown it's needed in practice. Because CAN is the standard pattern for long-running workflows, workflow history size is not the binding constraint; in-memory log growth between CAN boundaries is. The section now says so. Tests pass (23/23, pytest tests/contrib/pubsub/). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Addresses PR #1423 review comment C4: expose Temporal Payload at the PubSubItem / PublishEntry boundary so subscribers can decode via subscribe(result_type=T), matching execute_update(result_type=...). API changes: - PubSubMixin.publish(topic, value): value is any payload-convertible object or a pre-built Payload (zero-copy). - PubSubClient.publish(topic, value, force_flush=False): same shape; defers conversion to flush time, batching cost amortized. - PubSubClient.subscribe(topics, *, result_type=None, ...): yields PubSubItem whose data is a Payload by default, or the decoded result_type when one is supplied. - PubSubItem.data is now Any (Payload | decoded value). Wire format and codec decisions: - PublishEntry.data / _WireItem.data are base64(Payload.SerializeToString()). Nested Payload inside a dataclass fails with "Object of type Payload is not JSON serializable" because the default JSON converter only special-cases top-level Payloads on signal/update args. The base64-of-serialized- proto wire format keeps the JSON envelope while preserving Payload.metadata end-to-end. Round-trip is guarded by the new test_payload_roundtrip_prototype.py tests. - Per-item encoding uses the SYNC payload converter (workflow.payload_ converter() on the mixin, client.data_converter.payload_converter on the client). The codec chain (encryption, PII-redaction, compression) is NOT invoked per item — Temporal already runs the user's DataConverter.encode on the __pubsub_publish signal envelope and the __pubsub_poll update response, so running the codec per item as well would double-encrypt/compress (and compressing already-encrypted bytes defeats the codec). The per-item Payload still carries encoding metadata ("encoding: json/plain", "messageType: ...") which is what the subscribe(result_type=T) decode path actually needs. - Workflow-side and client-side are now codec-symmetric; the previously-feared asymmetry does not exist. Tests: - Existing pubsub tests updated: collect_items takes the Client (needed to reach the payload converter), subscribe calls pass result_type=bytes where they compare against raw bytes. - Added test_structured_type_round_trip: workflow publishes dataclass values, subscriber decodes via result_type= — exercises the primary value-add of the migration. - Added test_payload_roundtrip_prototype.py as a regression guard for the wire-format choice: one test asserts nested Payload in a dataclass fails, another asserts base64(proto(Payload)) round-trips. All 26 pubsub tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The bridge's Cargo.toml requires temporalio-client = "0.2.0" (set in 68561ee), but commit c4ec6e7 ("Update pubsub README: rename for_workflow → create") inadvertently reverted the sdk-core submodule pointer to f188eb53, a commit that still had the client crate at 0.1.0. This left uv/maturin unable to build the Rust bridge on this branch: Cargo resolves the requirement against the vendored crate and rejects 0.1.0 for the "^0.2.0" spec. Restore the pointer to b544f95d — the commit origin/main uses with the same Cargo.toml, so the bridge and its sdk-core workspace are consistent again. No Python code changes; purely a submodule pointer fix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reconciles DESIGN-v2.md with the "Streaming API Design Considerations" Notion page so both track the authoritative Python implementation. The Notion page had richer narrative (durable-streams framing, pull-vs-push reasoning, one-way-door callouts, offset-options comparison table, alternatives-considered list for wire evolution, end-to-end-principle writeup). This change brings that into the in-repo doc. Changes: - New top-of-doc note establishing that the Python code in sdk-python/temporalio/contrib/pubsub/ is authoritative; both DESIGN-v2.md and the Notion page track it. - New Decision #1 "Durable streams" explaining the durable-by-default choice vs ephemeral streams (simpler model, reliability, correctness). Existing decisions renumbered. - Decision #4 (Global offsets) gains the 6-option ecosystem comparison table and a one-way-door callout flagging the wire-protocol commitment. - Decision #9 (Subscription is poll-based) expanded with the pull-vs-push trade-off (back-pressure, subscriber-controlled read position, data-at-rest) and explicit "both layers are exposed" framing. - New "Design Principles" section with the Saltzer/Reed/Clark end-to-end-dedup framing and the "retries remain in the log" contract, with a one-way-door callout on the append-only-of-attempts contract. - Compatibility section gains a full alternatives-considered list (version field, versioned handler names, protocol negotiation, SDK version embedding, accepting silent incompatibility) and a two-part one-way-door callout on immutable handler names + no version field. - New "Ecosystem analogs" section: a compact one-paragraph summary (NATS JetStream for offsets, Kafka for idempotent producers, Redis for blocking pull, Workflow SDK as the durable-execution peer) with a pointer to the Notion page for the full comparison tables. The Notion page itself is still behind on the Payload migration (Decision #5 "Opaque message payloads" needs rewriting, API signatures still show priority= and data: bytes). That update is deferred pending resolution of an open reviewer discussion on activity-retry/dedup (discussion 34a8fc56-7738-808c-b29b-001c5066e9d2) whose substance overlaps with the Decision #5 rewrite. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # temporalio/contrib/google_adk_agents/_model.py
Follow-ups missed when the contrib/pubsub refactor renamed PubSubClient.create(batch_interval=...) → PubSubClient.from_activity(...) and publish(..., priority=True) → publish(..., force_flush=True). Both plugin activities still called the old signatures and failed at runtime with TypeError on the first publish. Also update the streaming tests to pass result_type=bytes to pubsub.subscribe(); after the bytes→Payload migration, item.data is a raw Payload unless a result_type is specified, so json.loads(item.data) was TypeErroring. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Users no longer inherit a mixin class. Instead, they construct
`PubSub(prior_state=...)` from `@workflow.init`; the constructor
registers the `__pubsub_publish` signal, `__pubsub_poll` update (with
validator), and `__pubsub_offset` query handlers dynamically via
`workflow.set_signal_handler`, `set_update_handler`, and
`set_query_handler`. The pub/sub wire contract (handler names, payload
shapes, offset semantics) is unchanged.
This matches how other-language SDKs will express the same pattern —
imperative handler registration from inside the workflow body rather
than inheritance — and lets the workflow retain its normal single base
class.
The constructor raises RuntimeError in two misuse cases:
1. Called twice on the same workflow — detected via
`workflow.get_signal_handler("__pubsub_publish") is not None`.
2. Called from anywhere other than `__init__` — detected by
inspecting the immediate caller's frame. History-length based
detection was tried first but has two false positives (pre-start
signals inflate first-task history length beyond 3, and cache
eviction legitimately re-runs `__init__` with a higher current
history length), so frame inspection is the correct mechanism.
Method renames on the broker (no longer needed as `_pubsub_*` prefixes
now that they live on a dedicated object):
init_pubsub(prior_state=None) -> PubSub(prior_state=None)
self.publish(topic, value) -> self.pubsub.publish(topic, value)
self.get_pubsub_state(...) -> self.pubsub.get_state(...)
self.drain_pubsub() -> self.pubsub.drain()
self.truncate_pubsub(up_to) -> self.pubsub.truncate(up_to)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fan-out: add a subsection under Design Decision 9 explaining that each __pubsub_poll is an independent update RPC with no shared delivery, so items destined for N subscribers cross the wire N times. Spells out the three concurrent-subscriber shapes (same topic/offset, different offsets, disjoint topics) and the rationale for the per-poll model. Future Work: new top-level section with three items — shared workflow fan-out (optimization of the above), workflow-defined filters and transforms, and a safe workflow-side subscribe() API. Each entry names the relevant design questions left open rather than prescribing an implementation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The streaming activity previously maintained a normalization layer: ~50 lines of if/elif mapping OpenAI event types (response.output_text.delta, response.reasoning_summary_*, etc.) to custom app event names (TEXT_DELTA, THINKING_*, LLM_CALL_START/COMPLETE), plus text-delta accumulation into a synthesized TEXT_COMPLETE, plus a function-call filter on output_item.added. That normalization made sense when a shared UI consumed events from multiple providers, but each provider-plugin should expose its native event stream and let consumers render idiomatically. The activity now publishes each yielded OpenAI event as its Pydantic JSON and returns the ModelResponse built from the final ResponseCompletedEvent — three lines inside the stream loop. Also factored out three helpers shared between the streaming and non-streaming activities (both paths were duplicating them verbatim): _build_tools_and_handoffs — tool/handoff reconstruction from dataclass form _build_tool — single tool-by-type dispatch _raise_for_openai_status — APIStatusError -> retry-posture translation The local-activity guard in _temporal_model_stub.py gains a comment explaining the two reasons streaming can't use local activities (no heartbeat channel, no pubsub signal context from the activity). Tests: replaced the normalized-event assertions with raw-event assertions; dropped the rich-dispatcher coverage test since there's no dispatcher left to cover. 115 passing / 16 skipped. Downstream impact: consumers that depend on the normalized event names (temporal-streaming-agents-samples frontend, shared-frontend hooks) need to switch on raw OpenAI event types instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- ruff format: apply formatter to auto-generated style changes. - pyright: replace dict literals for Response.text/usage with the pydantic model types (ResponseTextConfig, ResponseUsage, InputTokensDetails, OutputTokensDetails). - basedpyright: suppress reportUnusedFunction on the private _encode_payload/_decode_payload helpers in pubsub._types (they are used from sibling modules, which basedpyright does not credit) and reportUnusedParameter on the CAN workflow run() input arg. - pydocstyle: add docstrings to PubSubClient.__aenter__/__aexit__.
- typing.Self requires 3.11; import from typing_extensions like the rest of the SDK does. - asyncio.timeout requires 3.11; fall back to async_timeout.timeout on 3.10 (async_timeout is an aiohttp transitive dep there).
On Python 3.10 CI, the `if sys.version_info >= (3, 11):` branch is what basedpyright flags as unreachable. The ignore needs to be on both branches so it is silent under every Python version in the matrix.
The previous attempt placed the pragma on the indented `timeout as _async_timeout` line, but basedpyright reports reportUnreachable against the outer `from ... import (` line (the block-opening statement), so the pragma had no effect. Move the ignore up to the import line and combine with reportMissingImports there. Locally verified clean on Python 3.10, 3.11, and 3.14 via `uv run --python <ver> poe lint`.
Under parallel test load we saw test_poll_truncated_offset_returns_ application_error fail with "Cannot truncate to offset 3: only 0 items exist" — traced to an activation-ordering race. When a workflow receives an activation containing [InitializeWorkflow, Signal(__pubsub_publish), Update(truncate)] in one batch, _WorkflowInstanceImpl.activate groups signals and updates into job_sets[1] and init into job_sets[2]. During _apply of job_sets[1], __pubsub_publish (a dynamic signal registered inside PubSub.__init__) has no handler yet, so it is buffered; truncate is class-level @workflow.update, found in self._updates at activation time, and its task is created immediately and queued in self._ready. _run_once then lazy-instantiates the workflow, __init__ runs set_signal_handler which dispatches the buffered signal via a new task appended to self._ready after the update task. FIFO event-loop dispatch runs truncate against an empty log first; the handler raised ValueError which poisoned the whole workflow task. Fixes: 1. temporalio/contrib/pubsub/_broker.py — PubSub.truncate now raises ApplicationError(type="TruncateOutOfRange", non_retryable=True) instead of ValueError when the offset is past the end of the log. Matches what _on_poll already does for TruncatedOffset and lets update handlers surface the error cleanly without failing the task. 2. tests/contrib/pubsub/test_pubsub.py — TruncateWorkflow seeds the log from @workflow.init with a prepub_count arg. Three tests (test_poll_truncated_offset_returns_application_error, test_subscribe_recovers_from_truncation, test_truncate_pubsub) now pass prepub_count=5 to start_workflow rather than sending a client-side __pubsub_publish signal, sidestepping the dynamic- signal-before-init race entirely. 3. Tighten the poll-after-truncation assertion to check cause.type == "TruncatedOffset", and add test_truncate_past_end_raises_application_error to cover the new TruncateOutOfRange branch of PubSub.truncate. 4. temporalio/contrib/pubsub/_client.py — pydoctor couldn't resolve :class:\`~temporalio.api.common.v1.Payload\` against the generated proto module and was failing the docs build; switched that one cross-ref to plain backticks. Verified locally on Python 3.10 and 3.14: full lint clean, docs build clean, and pubsub tests pass 27/27 across three parallel runs.
Add a visible "Gotcha" section to the contrib/pubsub README covering the case where a custom synchronous update or signal handler reads PubSub state and races a same-activation __pubsub_publish signal. The race is inherent to registering __pubsub_publish dynamically from @workflow.init: on the first activation the signal is buffered until __init__ runs, and any class-level sync handler scheduled in the same activation observes pre-publish state. Framing in the README distinguishes the two cases where users do or don't need to care: - Independent producer/consumer shape (the common PubSub use): the handler already has to tolerate out-of-order arrival for reasons unrelated to this race, so no recipe is required. - Sequential same-client publish->update ordering: use the recipe. Recipe is a one-line "await asyncio.sleep(0)" at the top of the handler, which is a pure asyncio yield with no Temporal timer, no history events, and no server round trip. Explicit call-out that workflow.sleep(0) is not a substitute. Also extend SIGNAL-UPDATE-RACE.md with a "Zooming out" section that explains why the application layer typically subsumes this race, and update the Recommendation to treat the SDK-level dispatch fix (option 4) as optional follow-up rather than a must-fix. The PubSub class docstring gets a short note pointing at the README. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The existing TruncateWorkflow sidestepped the dynamic-signal-vs-update race by seeding the log from @workflow.init via prepub_count. That kept CI green but meant the test workflow did not exercise the pattern the README now asks users to follow (await asyncio.sleep(0) at the top of sync-shaped handlers reading PubSub state). Make truncate async with the recipe so the test workflow is a living example of the documented pattern, and simplify the docstring now that the race is closed in the handler rather than avoided via init-time seeding. prepub_count is kept as a convenience for the error-path tests that just need deterministic log content. All four truncate tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| @workflow.init | ||
| def __init__(self, input: WorkflowInput) -> None: | ||
| self.items_processed = input.items_processed | ||
| self.pubsub = PubSub(prior_state=input.pubsub_state) |
There was a problem hiding this comment.
one suggestion I can think of to simplify the continue as new based on the conversation in today's code walkthrough meeting:
from typing import Callable
from temporalio import workflow
class PubSub[T]:
def __init__(
self,
*,
prior_state: PubSubState | None = None,
build_continue_as_new_args: Callable[[], list[T]]
):
self._build_continue_as_new_args = build_continue_as_new_args
# ...
async def continue_as_new(self):
self.drain()
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(args=self._build_continue_as_new_args())
@workflow.defn
class MyWorkflow:
@workflow.init
def __init__(self, input: WorkflowInput) -> None:
self.items_processed = input.items_processed
self.pubsub: PubSub[WorkflowInput] = PubSub(
prior_state=input.pubsub_state,
build_continue_as_new_args=lambda: [WorkflowInput(
items_processed=self.items_processed,
pubsub_state=self.pubsub.get_state(),
)],
)
@workflow.run
async def run(self, input: WorkflowInput) -> None:
# ... do work, updating self.items_processed ...
if workflow.info().is_continue_as_new_suggested():
await self.pubsub.continue_as_new()One way I think this would be useful would be that it pushes the user to think about this continue as new scenario right out the gate. And by placing this build_continue_as_new_args in the constructor and simultaneously requiring the user to instantiate Pubsub(...) in the workflow's init, you end up forcing them to realize that they'd necessarily need to pass the pubsub state in their workflow input if they want to make use of CAN like this.
flush() is an explicit synchronization point: it returns once items buffered at call time have been signaled to the workflow and acknowledged by the server, and returns immediately when the buffer is empty. It complements the two existing flush mechanisms (force_flush=True on publish, context-manager exit) for the case where the caller needs proof that prior publications landed but the moment doesn't naturally correspond to a specific event. Implementation reuses _flush() under the existing flush_lock, looped while either _pending or _buffer is non-empty so the pending-vs-buffer staging in _flush() can drain in one call. DESIGN-v2 updates the API table and replaces the "no public flush()" paragraph with a section framing the three complementary flush mechanisms and when each is appropriate. Test test_explicit_flush_barrier exercises the documented contract: empty-buffer no-op, flush as a barrier with batch_interval=60s so a regression hangs rather than passing on the timer, and idempotent second flush. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Workflow-side (publisher_id, sequence) dedup is a polyfill for two
gaps in Temporal's built-in signal request_id dedup:
1. The Python SDK does not expose request_id on
WorkflowHandle.signal(), so cross-_flush() retries always allocate
a fresh request_id and bypass server-side dedup even within a
single run.
2. pendingSignalRequestedIDs is per-run mutable state and is not
copied across continue-as-new, so retries that straddle CAN are
accepted as fresh signals (verified empirically on dev server and
Temporal Cloud — see experiments/can-signal-dup/README.md).
When (1) and (2) are both fixed, the workflow-side check becomes
redundant. The dedup keys at both layers already align on
(publisher_id, sequence), so the migration is mechanical — pin
request_id=f"{publisher_id}:{seq}" in _flush(), drop the dedup
branch in _on_publish, retire publisher_sequences /
publisher_last_seen / publisher_ttl from PubSubState in a follow-up
wire-format pass.
Adds a "Future Work" subsection in DESIGN-v2 capturing the
prerequisites, the diff (what changes / stays / goes), and the
rollout sequencing. Adds short pointer comments at the two code
sites that would change so a future maintainer encounters the design
note at the right place.
No behavior change.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Convenience for single-topic subscribers — the common case. The previous signature required wrapping a single topic in a list, which is noisy at every call site. Internally we normalize to a list before issuing the poll update; behavior for None / empty list / multi-topic list is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rename the wire-level handler identifiers to follow the existing __temporal_ convention (__temporal_workflow_metadata, __temporal_activity_definition, etc.) so they are clearly recognizable as Temporal-internal and won't collide with user-defined handlers: __pubsub_publish -> __temporal_pubsub_publish __pubsub_poll -> __temporal_pubsub_poll __pubsub_offset -> __temporal_pubsub_offset Updates the broker/client implementation, tests, and design docs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- _broker.py:_validate_poll — rename `payload` to `_payload`, drop `del payload` and `# noqa: ARG002`. The noqa was dead code: CI runs only `ruff check --select I` (import sort), so ARG rules never fire. Underscore prefix silences basedpyright's reportUnusedParameter cleanly. - test_pubsub.py:ContinueAsNewTypedWorkflow.run — rename `input` to `_input` with `del _input`, drop the `type:ignore`. Now matches the existing `_prepub_count` pattern at TruncateWorkflow.run for the same @workflow.init/@workflow.run signature constraint. - test_pubsub.py async_timeout import — declare `async-timeout` as an explicit dev dep gated on `python_version < '3.11'`, drop the `reportMissingImports` half of the test pragma. Closes the audit gap of relying on aiohttp's transitive on 3.10. Kept the `reportUnreachable` ignores — still needed because basedpyright resolves `sys.version_info` against its own runtime, not the matrix Python. Verified `poe lint` clean on Python 3.10, 3.11, 3.14. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
What was changed
Adds
temporalio.contrib.pubsub, a reusable pub/sub primitive for streaming data out of Temporal workflows.Why?
Streaming incremental results from long-running workflows (e.g., AI agent token streams, progress updates) is a common need with no built-in solution. This module provides a correct, reusable implementation so users don't have to roll their own poll/signal/dedup logic.
Checklist
Closes — N/A (new contrib module, no existing issue)
How was this tested:
tests/contrib/pubsub/test_pubsub.pycovering batching, flush safety, CAN serialization, replay guards, dedup (TTL pruning, truncation), offset-based resumption, max_batch_size, drain, and error handlingAny docs updates needed?
README.mdwith usage examples and API referenceDESIGN-v2.md, and addenda covering CAN, dedup, and topic semantics